GVariant *extra_headers;
int max_outstanding;
- /* Queue for libsoup, see bgo#708591 */
- GQueue pending_queue;
+ /* Our active HTTP requests */
GHashTable *outstanding;
/* Shared across threads; be sure to lock. */
} ThreadClosure;
-static void
-session_thread_process_pending_queue (ThreadClosure *thread_closure);
-
typedef struct {
volatile int ref_count;
g_slice_free (IdleClosure, idle_closure);
}
-static int
-pending_task_compare (gconstpointer a,
- gconstpointer b,
- gpointer unused)
-{
- gint priority_a = g_task_get_priority (G_TASK (a));
- gint priority_b = g_task_get_priority (G_TASK (b));
-
- return (priority_a == priority_b) ? 0 :
- (priority_a < priority_b) ? -1 : 1;
-}
-
static OstreeFetcherPendingURI *
pending_uri_ref (OstreeFetcherPendingURI *pending)
{
on_request_sent (GObject *object, GAsyncResult *result, gpointer user_data);
static void
-session_thread_process_pending_queue (ThreadClosure *thread_closure)
+start_pending_request (ThreadClosure *thread_closure,
+ GTask *task)
{
- while (g_queue_peek_head (&thread_closure->pending_queue) != NULL &&
- g_hash_table_size (thread_closure->outstanding) < thread_closure->max_outstanding)
- {
- GTask *task;
- OstreeFetcherPendingURI *pending;
- GCancellable *cancellable;
-
- task = g_queue_pop_head (&thread_closure->pending_queue);
-
- pending = g_task_get_task_data (task);
- cancellable = g_task_get_cancellable (task);
+ OstreeFetcherPendingURI *pending;
+ GCancellable *cancellable;
- g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
+ g_assert_cmpint (g_hash_table_size (thread_closure->outstanding), <, thread_closure->max_outstanding);
- soup_request_send_async (pending->request,
- cancellable,
- on_request_sent,
- g_object_ref (task));
+ pending = g_task_get_task_data (task);
+ cancellable = g_task_get_cancellable (task);
- g_object_unref (task);
- }
+ g_hash_table_add (thread_closure->outstanding, pending_uri_ref (pending));
+ soup_request_send_async (pending->request,
+ cancellable,
+ on_request_sent,
+ g_object_ref (task));
}
static void
pending->out_tmpfile = tmpfile;
tmpfile = NULL; /* Transfer ownership */
- g_queue_insert_sorted (&thread_closure->pending_queue,
- g_object_ref (task),
- pending_task_compare, NULL);
- session_thread_process_pending_queue (thread_closure);
+ start_pending_request (thread_closure, task);
}
}
* unreference all data related to the SoupSession ourself to ensure
* it's freed in the same thread where it was created. */
g_clear_pointer (&closure->outstanding, g_hash_table_unref);
- while (!g_queue_is_empty (&closure->pending_queue))
- g_object_unref (g_queue_pop_head (&closure->pending_queue));
g_clear_pointer (&closure->session, g_object_unref);
thread_closure_unref (closure);
pending->state = OSTREE_FETCHER_STATE_COMPLETE;
- /* Now that we've finished downloading, continue with other queued
- * requests.
- */
- session_thread_process_pending_queue (pending->thread_closure);
-
if (!pending->is_membuf)
{
if (stbuf.st_size < pending->content_length)
gpointer user_data);
static void
-remove_pending_rerun_queue (OstreeFetcherPendingURI *pending)
+remove_pending (OstreeFetcherPendingURI *pending)
{
/* Hold a temporary ref to ensure the reference to
* pending->thread_closure is valid.
*/
pending_uri_ref (pending);
g_hash_table_remove (pending->thread_closure->outstanding, pending);
- session_thread_process_pending_queue (pending->thread_closure);
pending_uri_unref (pending);
}
if (local_error)
{
g_task_return_error (task, local_error);
- remove_pending_rerun_queue (pending);
+ remove_pending (pending);
}
g_object_unref (task);
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
}
- remove_pending_rerun_queue (pending);
+ remove_pending (pending);
}
else
{
if (local_error)
{
g_task_return_error (task, local_error);
- remove_pending_rerun_queue (pending);
+ remove_pending (pending);
}
g_object_unref (task);
g_task_return_pointer (task,
g_strdup (pending->out_tmpfile),
(GDestroyNotify) g_free);
- remove_pending_rerun_queue (pending);
+ remove_pending (pending);
goto out;
}
else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
goto out;
(void) g_input_stream_close (pending->request_body, NULL, NULL);
- g_queue_insert_sorted (&pending->thread_closure->pending_queue,
- g_object_ref (task), pending_task_compare,
- NULL);
- remove_pending_rerun_queue (pending);
+
+ start_pending_request (pending->thread_closure, task);
}
else
{
if (pending->request_body)
(void) g_input_stream_close (pending->request_body, NULL, NULL);
g_task_return_error (task, local_error);
- remove_pending_rerun_queue (pending);
+ remove_pending (pending);
}
g_object_unref (task);